{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Serving a custom model\n",
    "\n",
    "The `mlserver` package comes with inference runtime implementations for `scikit-learn` and `xgboost` models.\n",
    "However, some times we may also need to roll out our own inference server, with custom logic to perform inference.\n",
    "To support this scenario, MLServer makes it really easy to create your own extensions, which can then be containerised and deployed in a production environment."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Overview\n",
    "\n",
    "In this example, we will train a [`numpyro` model](http://num.pyro.ai/en/stable/). \n",
    "The `numpyro` library streamlines the implementation of probabilistic models, abstracting away advanced inference and training algorithms.\n",
    "\n",
    "Out of the box, `mlserver` doesn't provide an inference runtime for `numpyro`.\n",
    "However, through this example we will see how easy is to develop our own."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Training\n",
    "\n",
    "The first step will be to train our model.\n",
    "This will be a very simple bayesian regression model, based on an example provided in the [`numpyro` docs](https://nbviewer.jupyter.org/github/pyro-ppl/numpyro/blob/master/notebooks/source/bayesian_regression.ipynb).\n",
    "\n",
    "Since this is a probabilistic model, during training we will compute an approximation to the posterior distribution of our model using MCMC."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Original source code and more details can be found in:\n",
    "# https://nbviewer.jupyter.org/github/pyro-ppl/numpyro/blob/master/notebooks/source/bayesian_regression.ipynb\n",
    "\n",
    "\n",
    "import numpyro\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "\n",
    "from numpyro import distributions as dist\n",
    "from jax import random\n",
    "from numpyro.infer import MCMC, NUTS\n",
    "\n",
    "DATASET_URL = 'https://raw.githubusercontent.com/rmcelreath/rethinking/master/data/WaffleDivorce.csv'\n",
    "dset = pd.read_csv(DATASET_URL, sep=';')\n",
    "\n",
    "standardize = lambda x: (x - x.mean()) / x.std()\n",
    "\n",
    "dset['AgeScaled'] = dset.MedianAgeMarriage.pipe(standardize)\n",
    "dset['MarriageScaled'] = dset.Marriage.pipe(standardize)\n",
    "dset['DivorceScaled'] = dset.Divorce.pipe(standardize)\n",
    "\n",
    "def model(marriage=None, age=None, divorce=None):\n",
    "    a = numpyro.sample('a', dist.Normal(0., 0.2))\n",
    "    M, A = 0., 0.\n",
    "    if marriage is not None:\n",
    "        bM = numpyro.sample('bM', dist.Normal(0., 0.5))\n",
    "        M = bM * marriage\n",
    "    if age is not None:\n",
    "        bA = numpyro.sample('bA', dist.Normal(0., 0.5))\n",
    "        A = bA * age\n",
    "    sigma = numpyro.sample('sigma', dist.Exponential(1.))\n",
    "    mu = a + M + A\n",
    "    numpyro.sample('obs', dist.Normal(mu, sigma), obs=divorce)\n",
    "\n",
    "# Start from this source of randomness. We will split keys for subsequent operations.\n",
    "rng_key = random.PRNGKey(0)\n",
    "rng_key, rng_key_ = random.split(rng_key)\n",
    "\n",
    "num_warmup, num_samples = 1000, 2000\n",
    "\n",
    "# Run NUTS.\n",
    "kernel = NUTS(model)\n",
    "mcmc = MCMC(kernel, num_warmup=num_warmup, num_samples=num_samples)\n",
    "mcmc.run(rng_key_, marriage=dset.MarriageScaled.values, divorce=dset.DivorceScaled.values)\n",
    "mcmc.print_summary()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Saving our trained model\n",
    "\n",
    "Now that we have _trained_ our model, the next step will be to save it so that it can be loaded afterwards at serving-time.\n",
    "Note that, since this is a probabilistic model, we will only need to save the traces that approximate the posterior distribution over latent parameters.\n",
    "\n",
    "This will get saved in a `numpyro-divorce.json` file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "samples = mcmc.get_samples()\n",
    "serialisable = {}\n",
    "for k, v in samples.items():\n",
    "    serialisable[k] = np.asarray(v).tolist()\n",
    "    \n",
    "model_file_name = \"numpyro-divorce.json\"\n",
    "with open(model_file_name, 'w') as model_file:\n",
    "    json.dump(serialisable, model_file)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Serving\n",
    "\n",
    "The next step will be to serve our model using `mlserver`. \n",
    "For that, we will first implement an extension which serve as the _runtime_ to perform inference using our custom `numpyro` model."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Custom inference runtime\n",
    "\n",
    "Our custom inference wrapper should be responsible of:\n",
    "\n",
    "- Loading the model from the set samples we saved previously.\n",
    "- Running inference using our model structure, and the posterior approximated from the samples.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile models.py\n",
    "import json\n",
    "import numpyro\n",
    "import numpy as np\n",
    "\n",
    "from typing import Dict\n",
    "from jax import random\n",
    "from mlserver import MLModel, types\n",
    "from mlserver.utils import get_model_uri\n",
    "from numpyro.infer import Predictive\n",
    "from numpyro import distributions as dist\n",
    "\n",
    "\n",
    "class NumpyroModel(MLModel):\n",
    "    async def load(self) -> bool:\n",
    "        model_uri = await get_model_uri(self._settings)\n",
    "        with open(model_uri) as model_file:\n",
    "            raw_samples = json.load(model_file)\n",
    "\n",
    "        self._samples = {}\n",
    "        for k, v in raw_samples.items():\n",
    "            self._samples[k] = np.array(v)\n",
    "\n",
    "        self._predictive = Predictive(self._model, self._samples)\n",
    "\n",
    "        self.ready = True\n",
    "        return self.ready\n",
    "\n",
    "    async def predict(self, payload: types.InferenceRequest) -> types.InferenceResponse:\n",
    "        inputs = self._extract_inputs(payload)\n",
    "        predictions = self._predictive(rng_key=random.PRNGKey(0), **inputs)\n",
    "\n",
    "        obs = predictions[\"obs\"]\n",
    "        obs_mean = obs.mean()\n",
    "\n",
    "        return types.InferenceResponse(\n",
    "            id=payload.id,\n",
    "            model_name=self.name,\n",
    "            model_version=self.version,\n",
    "            outputs=[\n",
    "                types.ResponseOutput(\n",
    "                    name=\"obs_mean\",\n",
    "                    shape=obs_mean.shape,\n",
    "                    datatype=\"FP32\",\n",
    "                    data=np.asarray(obs_mean).tolist(),\n",
    "                )\n",
    "            ],\n",
    "        )\n",
    "\n",
    "    def _extract_inputs(self, payload: types.InferenceRequest) -> Dict[str, np.ndarray]:\n",
    "        inputs = {}\n",
    "        for inp in payload.inputs:\n",
    "            inputs[inp.name] = np.array(inp.data)\n",
    "\n",
    "        return inputs\n",
    "\n",
    "    def _model(self, marriage=None, age=None, divorce=None):\n",
    "        a = numpyro.sample(\"a\", dist.Normal(0.0, 0.2))\n",
    "        M, A = 0.0, 0.0\n",
    "        if marriage is not None:\n",
    "            bM = numpyro.sample(\"bM\", dist.Normal(0.0, 0.5))\n",
    "            M = bM * marriage\n",
    "        if age is not None:\n",
    "            bA = numpyro.sample(\"bA\", dist.Normal(0.0, 0.5))\n",
    "            A = bA * age\n",
    "        sigma = numpyro.sample(\"sigma\", dist.Exponential(1.0))\n",
    "        mu = a + M + A\n",
    "        numpyro.sample(\"obs\", dist.Normal(mu, sigma), obs=divorce)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Settings files\n",
    "\n",
    "The next step will be to create 2 configuration files: \n",
    "\n",
    "- `settings.json`: holds the configuration of our server (e.g. ports, log level, etc.).\n",
    "- `model-settings.json`: holds the configuration of our model (e.g. input type, runtime to use, etc.)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### `settings.json`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile settings.json\n",
    "{\n",
    "    \"debug\": \"true\"\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### `model-settings.json`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile model-settings.json\n",
    "{\n",
    "    \"name\": \"numpyro-divorce\",\n",
    "    \"implementation\": \"models.NumpyroModel\",\n",
    "    \"parameters\": {\n",
    "        \"uri\": \"./numpyro-divorce.json\"\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Start serving our model\n",
    "\n",
    "Now that we have our config in-place, we can start the server by running `mlserver start .`. This needs to either be ran from the same directory where our config files are or pointing to the folder where they are.\n",
    "\n",
    "```shell\n",
    "mlserver start .\n",
    "```\n",
    "\n",
    "Since this command will start the server and block the terminal, waiting for requests, this will need to be ran in the background on a separate terminal."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Send test inference request\n",
    "\n",
    "\n",
    "We now have our model being served by `mlserver`.\n",
    "To make sure that everything is working as expected, let's send a request from our test set.\n",
    "\n",
    "For that, we can use the Python types that `mlserver` provides out of box, or we can build our request manually."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "x_0 = [28.0]\n",
    "inference_request = {\n",
    "    \"inputs\": [\n",
    "        {\n",
    "          \"name\": \"marriage\",\n",
    "          \"shape\": [1],\n",
    "          \"datatype\": \"FP32\",\n",
    "          \"data\": x_0\n",
    "        }\n",
    "    ]\n",
    "}\n",
    "\n",
    "endpoint = \"http://localhost:8080/v2/models/numpyro-divorce/infer\"\n",
    "response = requests.post(endpoint, json=inference_request)\n",
    "\n",
    "response.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Deployment\n",
    "\n",
    "Now that we have written and tested our custom model, the next step is to deploy it.\n",
    "With that goal in mind, the rough outline of steps will be to first build a custom image containing our code, and then deploy it.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Building a custom image"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```{note}\n",
    "This section expects that Docker is available and running in the background. \n",
    "```\n",
    "\n",
    "MLServer offers helpers to build a custom Docker image containing your code.\n",
    "In this example, we will use the `mlserver build` subcommand to create an image, which we'll be able to deploy later.\n",
    "\n",
    "\n",
    "Note that this section expects that Docker is available and running in the background, as well as a functional cluster with Seldon Core installed and some familiarity with `kubectl`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "%%bash\n",
    "mlserver build . -t 'my-custom-numpyro-server:0.1.0'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To ensure that the image is fully functional, we can spin up a container and then send a test request. To start the container, you can run something along the following lines in a separate terminal:\n",
    "\n",
    "```bash\n",
    "docker run -it --rm -p 8080:8080 my-custom-numpyro-server:0.1.0\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "x_0 = [28.0]\n",
    "inference_request = {\n",
    "    \"inputs\": [\n",
    "        {\n",
    "          \"name\": \"marriage\",\n",
    "          \"shape\": [1],\n",
    "          \"datatype\": \"FP32\",\n",
    "          \"data\": x_0\n",
    "        }\n",
    "    ]\n",
    "}\n",
    "\n",
    "endpoint = \"http://localhost:8080/v2/models/numpyro-divorce/infer\"\n",
    "response = requests.post(endpoint, json=inference_request)\n",
    "\n",
    "print(response)\n",
    "print(response.text)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As we should be able to see, the server running within our Docker image responds as expected."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Deploying our custom image\n",
    "\n",
    "```{note}\n",
    "This section expects access to a functional Kubernetes cluster with Seldon Core installed and some familiarity with `kubectl`. \n",
    "```\n",
    "\n",
    "Now that we've built a custom image and verified that it works as expected, we can move to the next step and deploy it.\n",
    "There is a large number of tools out there to deploy images.\n",
    "However, for our example, we will focus on deploying it to a cluster running [Seldon Core](https://docs.seldon.io/projects/seldon-core/en/latest/).\n",
    "\n",
    "For that, we will need to create a `SeldonDeployment` resource which instructs Seldon Core to deploy a model embedded within our custom image and compliant with the [V2 Inference Protocol](https://github.com/kserve/kserve/tree/master/docs/predict-api/v2).\n",
    "This can be achieved by _applying_ (i.e. `kubectl apply`) a `SeldonDeployment` manifest to the cluster, similar to the one below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile seldondeployment.yaml\n",
    "apiVersion: machinelearning.seldon.io/v1\n",
    "kind: SeldonDeployment\n",
    "metadata:\n",
    "  name: numpyro-model\n",
    "spec:\n",
    "  protocol: v2\n",
    "  predictors:\n",
    "    - name: default\n",
    "      graph:\n",
    "        name: numpyro-divorce\n",
    "        type: MODEL\n",
    "      componentSpecs:\n",
    "        - spec:\n",
    "            containers:\n",
    "              - name: numpyro-divorce\n",
    "                image: my-custom-numpyro-server:0.1.0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
